package net.quanter.shield.mq.rocketmq.consumer;

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import lombok.extern.slf4j.Slf4j;
import net.quanter.shield.mq.MQConsumer;
import net.quanter.shield.mq.MQMessageVO;
import net.quanter.shield.mq.RunType;
import net.quanter.shield.mq.rocketmq.param.RocketMQBorkerParam;
import net.quanter.shield.mq.rocketmq.utils.ObjectUtils;
import org.apache.commons.lang3.StringUtils;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;

@Slf4j
public class RocketMQHttpConsumner implements MQConsumer {

    final RocketMQBorkerParam mqConnectVO;
    private final String groupId;
    private final RocketMQConsumerHttpParam[] mqConsumerConnectVOS;
    private volatile boolean run = true;

    public RocketMQHttpConsumner(
            RocketMQBorkerParam mqConnectVO,
            String groupId,
            RocketMQConsumerParam... rocketMQConsumerParams) {
        this.mqConnectVO = mqConnectVO;
        this.groupId = groupId;
        mqConsumerConnectVOS = new RocketMQConsumerHttpParam[rocketMQConsumerParams.length];
        for (int i = 0; i < rocketMQConsumerParams.length; i++) {
            mqConsumerConnectVOS[i] = new RocketMQConsumerHttpParam(rocketMQConsumerParams[i]);
        }
        MQClient mqClient = new MQClient(
                mqConnectVO.getEndPoint(),
                mqConnectVO.getAccessId(),
                mqConnectVO.getAccessKey()
        );

        for (RocketMQConsumerHttpParam mqConsumerConnectVO : mqConsumerConnectVOS) {
            mqConsumerConnectVO.setMqClient(mqClient);
            if (StringUtils.isNoneBlank(mqConsumerConnectVO.rocketMQConsumerParam.getTopic().getInstanceId())) {
                com.aliyun.mq.http.MQConsumer mqConsumer = mqClient.getConsumer(
                        mqConsumerConnectVO.rocketMQConsumerParam.getTopic().getInstanceId(),
                        mqConsumerConnectVO.rocketMQConsumerParam.getTopic().getName(),
                        groupId, null);
                mqConsumerConnectVO.setConsumer(mqConsumer);
            } else {
                com.aliyun.mq.http.MQConsumer mqConsumer = mqClient.getConsumer(mqConsumerConnectVO.rocketMQConsumerParam.getTopic().getName(), groupId);
                mqConsumerConnectVO.setConsumer(mqConsumer);
            }
        }
    }

    @Override
    public void start(RunType runType) {
        for (RocketMQConsumerHttpParam mqConsumerConnectVO : mqConsumerConnectVOS) {
            //这里直接用野线程不用线程组
            //1.考虑到是开发环境才会使用http连接，不需要考虑线程组问题
            //2.必须保证订阅每个topic的线程一定在运行。
            if (runType == RunType.SYNC) {
                startHttpConsumer(mqConsumerConnectVO);
            } else {
                Thread thread = new Thread(() -> {
                    startHttpConsumer(mqConsumerConnectVO);
                });
                thread.start();
            }
        }
    }


    public void startHttpConsumer(
            final RocketMQConsumerHttpParam mqConsumerConnectVO) {
        log.info("http consumer[{}] starting listening", mqConsumerConnectVO);
        // 在当前线程循环消费消息，建议多开个几个线程并发消费消息。
        do {
            List<Message> messages = null;
            try {
                // 长轮询消费消息。
                // 长轮询表示如果Topic没有消息,则请求会在服务端挂起3s，3s内如果有消息可以消费则立即返回客户端。
                messages = mqConsumerConnectVO.getConsumer().consumeMessage(
                        3,// 一次最多消费3条消息（最多可设置为16条）。
                        3// 长轮询时间3秒（最多可设置为30秒）。
                );
            } catch (Throwable e) {
                if (e.getMessage().contains("Message not exist.")) {
                    log.debug("no more messages");
                } else {
                    e.printStackTrace();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {

                }
            }
            // Topic中没有消息可消费。
            if (messages == null || messages.isEmpty()) {
                log.debug(Thread.currentThread().getName() + ": no new message, continue!");
                continue;
            }

            List<String> handles = new ArrayList<String>();
            // 处理业务逻辑。
            for (Message message : messages) {
                try {
                    MQMessageVO MQMessageVO = fromRocketHttpMessage(mqConsumerConnectVO.rocketMQConsumerParam.getListener().getMessageType(), message);
                    boolean success = mqConsumerConnectVO.rocketMQConsumerParam.getListener().process(MQMessageVO);
                    if (success) {
                        handles.add(message.getReceiptHandle());
                    }
                } catch (Throwable e) {
                    e.printStackTrace();
                    continue;
                }
            }

            // 消息重试时间到达前若不确认消息消费成功，则消息会被重复消费。
            // 消息句柄有时间戳，同一条消息每次消费的时间戳都不一样。
            try {
                if (handles != null && !handles.isEmpty()) {
                    mqConsumerConnectVO.getConsumer().ackMessage(handles);
                }
            } catch (Throwable e) {
                // 某些消息的句柄可能超时，会导致消息消费状态确认不成功。
                if (e instanceof AckMessageException) {
                    AckMessageException errors = (AckMessageException) e;
                    log.error("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                    if (errors.getErrorMessages() != null) {
                        for (String errorHandle : errors.getErrorMessages().keySet()) {
                            log.error("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                    + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                        }
                    }
                    continue;
                }
                e.printStackTrace();
            }
        } while (run);
        log.info("http consumer[{}] is stoped!", mqConsumerConnectVO);
    }

    @Override
    public void stop() {
        run = false;
    }


    public static <T> MQMessageVO<T> fromRocketHttpMessage(Type type, Message message) {
        MQMessageVO MQMessageVO = new MQMessageVO();
        if (message.getProperties() != null) {
            MQMessageVO.putAll(message.getProperties());
        }
        MQMessageVO.setMessageMD5(message.getMessageBodyMD5());
        MQMessageVO.putAll(message.getProperties());
        MQMessageVO.setShardKey(message.getShardingKey());
        MQMessageVO.setTag(message.getMessageTag());
        MQMessageVO.setConsumedTimes(message.getConsumedTimes());
        MQMessageVO.setPublishTime(message.getPublishTime());
        MQMessageVO.setNextConsumeTime(message.getNextConsumeTime());
        MQMessageVO.setFirstConsumeTime(message.getFirstConsumeTime());
        MQMessageVO.setMessageId(message.getMessageId());
        MQMessageVO.setRequestId(message.getRequestId());
        byte[] bytes = message.getMessageBodyBytes();
        Object body = ObjectUtils.byteArrayToObject(type, bytes);
        MQMessageVO.setObj(body);
        return MQMessageVO;
    }
}
